1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 package sun.nio.ch;
26
27 import java.net.InetAddress;
28 import java.net.SocketAddress;
29 import java.net.SocketException;
30 import java.net.InetSocketAddress;
31 import java.io.FileDescriptor;
32 import java.io.IOException;
33 import java.util.Collections;
34 import java.util.Set;
35 import java.util.HashSet;
36 import java.nio.ByteBuffer;
37 import java.nio.channels.SelectionKey;
38 import java.nio.channels.ClosedChannelException;
39 import java.nio.channels.ConnectionPendingException;
40 import java.nio.channels.NoConnectionPendingException;
41 import java.nio.channels.AlreadyConnectedException;
42 import java.nio.channels.NotYetBoundException;
43 import java.nio.channels.NotYetConnectedException;
44 import java.nio.channels.spi.SelectorProvider;
45 import com.sun.nio.sctp.AbstractNotificationHandler;
46 import com.sun.nio.sctp.Association;
47 import com.sun.nio.sctp.AssociationChangeNotification;
48 import com.sun.nio.sctp.HandlerResult;
49 import com.sun.nio.sctp.IllegalReceiveException;
50 import com.sun.nio.sctp.InvalidStreamException;
51 import com.sun.nio.sctp.IllegalUnbindException;
52 import com.sun.nio.sctp.MessageInfo;
53 import com.sun.nio.sctp.NotificationHandler;
54 import com.sun.nio.sctp.SctpChannel;
55 import com.sun.nio.sctp.SctpSocketOption;
56 import sun.nio.ch.PollArrayWrapper;
57 import sun.nio.ch.SelChImpl;
58 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
59 import static sun.nio.ch.SctpResultContainer.SEND_FAILED;
60 import static sun.nio.ch.SctpResultContainer.ASSOCIATION_CHANGED;
61 import static sun.nio.ch.SctpResultContainer.PEER_ADDRESS_CHANGED;
62 import static sun.nio.ch.SctpResultContainer.SHUTDOWN;
63
64
65
66
67 public class SctpChannelImpl extends SctpChannel
68 implements SelChImpl
69 {
70 private final FileDescriptor fd;
71
72 private final int fdVal;
73
74
75 private volatile long receiverThread = 0;
76 private volatile long senderThread = 0;
77
78
79 private final Object receiveLock = new Object();
80
81
82 private final Object sendLock = new Object();
83
84 private final ThreadLocal<Boolean> receiveInvoked =
85 new ThreadLocal<Boolean>() {
86 @Override protected Boolean initialValue() {
87 return Boolean.FALSE;
88 }
89 };
90
91
92
93 private final Object stateLock = new Object();
94
95 private enum ChannelState {
96 UNINITIALIZED,
97 UNCONNECTED,
98 PENDING,
99 CONNECTED,
100 KILLPENDING,
101 KILLED,
102 }
103
104 private ChannelState state = ChannelState.UNINITIALIZED;
105
106
107 int port = -1;
108 private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>();
109
110 private boolean wildcard;
111
112
113
114 private boolean readyToConnect;
115
116
117 private boolean isShutdown;
118
119 private Association association;
120
121 private Set<SocketAddress> remoteAddresses = Collections.EMPTY_SET;
122
123
124
125
126
127
128 public SctpChannelImpl(SelectorProvider provider) throws IOException {
129
130 super(provider);
131 this.fd = SctpNet.socket(true);
132 this.fdVal = IOUtil.fdVal(fd);
133 this.state = ChannelState.UNCONNECTED;
134 }
135
136
137
138
139 public SctpChannelImpl(SelectorProvider provider, FileDescriptor fd)
140 throws IOException {
141 this(provider, fd, null);
142 }
143
144
145
146
147 public SctpChannelImpl(SelectorProvider provider,
148 FileDescriptor fd,
149 Association association)
150 throws IOException {
151 super(provider);
152 this.fd = fd;
153 this.fdVal = IOUtil.fdVal(fd);
154 this.state = ChannelState.CONNECTED;
155 port = (Net.localAddress(fd)).getPort();
156
157 if (association != null) {
158 this.association = association;
159 } else {
160
161 ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
162 try {
163 receive(buf, null, null, true);
164 } finally {
165 Util.releaseTemporaryDirectBuffer(buf);
166 }
167 }
168 }
169
170
171
172
173 @Override
174 public SctpChannel bind(SocketAddress local) throws IOException {
175 synchronized (receiveLock) {
176 synchronized (sendLock) {
177 synchronized (stateLock) {
178 ensureOpenAndUnconnected();
179 if (isBound())
180 SctpNet.throwAlreadyBoundException();
181 InetSocketAddress isa = (local == null) ?
182 new InetSocketAddress(0) : Net.checkAddress(local);
183 Net.bind(fd, isa.getAddress(), isa.getPort());
184 InetSocketAddress boundIsa = Net.localAddress(fd);
185 port = boundIsa.getPort();
186 localAddresses.add(isa);
187 if (isa.getAddress().isAnyLocalAddress())
188 wildcard = true;
189 }
190 }
191 }
192 return this;
193 }
194
195 @Override
196 public SctpChannel bindAddress(InetAddress address)
197 throws IOException {
198 bindUnbindAddress(address, true);
199 localAddresses.add(new InetSocketAddress(address, port));
200 return this;
201 }
202
203 @Override
204 public SctpChannel unbindAddress(InetAddress address)
205 throws IOException {
206 bindUnbindAddress(address, false);
207 localAddresses.remove(new InetSocketAddress(address, port));
208 return this;
209 }
210
211 private SctpChannel bindUnbindAddress(InetAddress address, boolean add)
212 throws IOException {
213 if (address == null)
214 throw new IllegalArgumentException();
215
216 synchronized (receiveLock) {
217 synchronized (sendLock) {
218 synchronized (stateLock) {
219 if (!isOpen())
220 throw new ClosedChannelException();
221 if (!isBound())
222 throw new NotYetBoundException();
223 if (wildcard)
224 throw new IllegalStateException(
225 "Cannot add or remove addresses from a channel that is bound to the wildcard address");
226 if (address.isAnyLocalAddress())
227 throw new IllegalArgumentException(
228 "Cannot add or remove the wildcard address");
229 if (add) {
230 for (InetSocketAddress addr : localAddresses) {
231 if (addr.getAddress().equals(address)) {
232 SctpNet.throwAlreadyBoundException();
233 }
234 }
235 } else {
236
237
238 if (localAddresses.size() <= 1)
239 throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound");
240 boolean foundAddress = false;
241 for (InetSocketAddress addr : localAddresses) {
242 if (addr.getAddress().equals(address)) {
243 foundAddress = true;
244 break;
245 }
246 }
247 if (!foundAddress )
248 throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address");
249 }
250
251 SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add);
252
253
254 if (add)
255 localAddresses.add(new InetSocketAddress(address, port));
256 else {
257 for (InetSocketAddress addr : localAddresses) {
258 if (addr.getAddress().equals(address)) {
259 localAddresses.remove(addr);
260 break;
261 }
262 }
263 }
264 }
265 }
266 }
267 return this;
268 }
269
270 private boolean isBound() {
271 synchronized (stateLock) {
272 return port == -1 ? false : true;
273 }
274 }
275
276 private boolean isConnected() {
277 synchronized (stateLock) {
278 return (state == ChannelState.CONNECTED);
279 }
280 }
281
282 private void ensureOpenAndUnconnected() throws IOException {
283 synchronized (stateLock) {
284 if (!isOpen())
285 throw new ClosedChannelException();
286 if (isConnected())
287 throw new AlreadyConnectedException();
288 if (state == ChannelState.PENDING)
289 throw new ConnectionPendingException();
290 }
291 }
292
293 private boolean ensureReceiveOpen() throws ClosedChannelException {
294 synchronized (stateLock) {
295 if (!isOpen())
296 throw new ClosedChannelException();
297 if (!isConnected())
298 throw new NotYetConnectedException();
299 else
300 return true;
301 }
302 }
303
304 private void ensureSendOpen() throws ClosedChannelException {
305 synchronized (stateLock) {
306 if (!isOpen())
307 throw new ClosedChannelException();
308 if (isShutdown)
309 throw new ClosedChannelException();
310 if (!isConnected())
311 throw new NotYetConnectedException();
312 }
313 }
314
315 private void receiverCleanup() throws IOException {
316 synchronized (stateLock) {
317 receiverThread = 0;
318 if (state == ChannelState.KILLPENDING)
319 kill();
320 }
321 }
322
323 private void senderCleanup() throws IOException {
324 synchronized (stateLock) {
325 senderThread = 0;
326 if (state == ChannelState.KILLPENDING)
327 kill();
328 }
329 }
330
331 @Override
332 public Association association() throws ClosedChannelException {
333 synchronized (stateLock) {
334 if (!isOpen())
335 throw new ClosedChannelException();
336 if (!isConnected())
337 return null;
338
339 return association;
340 }
341 }
342
343 @Override
344 public boolean connect(SocketAddress endpoint) throws IOException {
345 synchronized (receiveLock) {
346 synchronized (sendLock) {
347 ensureOpenAndUnconnected();
348 InetSocketAddress isa = Net.checkAddress(endpoint);
349 SecurityManager sm = System.getSecurityManager();
350 if (sm != null)
351 sm.checkConnect(isa.getAddress().getHostAddress(),
352 isa.getPort());
353 synchronized (blockingLock()) {
354 int n = 0;
355 try {
356 try {
357 begin();
358 synchronized (stateLock) {
359 if (!isOpen()) {
360 return false;
361 }
362 receiverThread = NativeThread.current();
363 }
364 for (;;) {
365 InetAddress ia = isa.getAddress();
366 if (ia.isAnyLocalAddress())
367 ia = InetAddress.getLocalHost();
368 n = SctpNet.connect(fdVal, ia, isa.getPort());
369 if ( (n == IOStatus.INTERRUPTED)
370 && isOpen())
371 continue;
372 break;
373 }
374 } finally {
375 receiverCleanup();
376 end((n > 0) || (n == IOStatus.UNAVAILABLE));
377 assert IOStatus.check(n);
378 }
379 } catch (IOException x) {
380
381
382
383 close();
384 throw x;
385 }
386
387 if (n > 0) {
388 synchronized (stateLock) {
389
390 state = ChannelState.CONNECTED;
391 if (!isBound()) {
392 InetSocketAddress boundIsa =
393 Net.localAddress(fd);
394 port = boundIsa.getPort();
395 }
396
397
398 ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
399 try {
400 receive(buf, null, null, true);
401 } finally {
402 Util.releaseTemporaryDirectBuffer(buf);
403 }
404
405
406 try {
407 remoteAddresses = getRemoteAddresses();
408 } catch (IOException unused) { }
409
410 return true;
411 }
412 } else {
413 synchronized (stateLock) {
414
415
416 if (!isBlocking())
417 state = ChannelState.PENDING;
418 else
419 assert false;
420 }
421 }
422 }
423 return false;
424 }
425 }
426 }
427
428 @Override
429 public boolean connect(SocketAddress endpoint,
430 int maxOutStreams,
431 int maxInStreams)
432 throws IOException {
433 ensureOpenAndUnconnected();
434 return setOption(SCTP_INIT_MAXSTREAMS, InitMaxStreams.
435 create(maxInStreams, maxOutStreams)).connect(endpoint);
436
437 }
438
439 @Override
440 public boolean isConnectionPending() {
441 synchronized (stateLock) {
442 return (state == ChannelState.PENDING);
443 }
444 }
445
446 @Override
447 public boolean finishConnect() throws IOException {
448 synchronized (receiveLock) {
449 synchronized (sendLock) {
450 synchronized (stateLock) {
451 if (!isOpen())
452 throw new ClosedChannelException();
453 if (isConnected())
454 return true;
455 if (state != ChannelState.PENDING)
456 throw new NoConnectionPendingException();
457 }
458 int n = 0;
459 try {
460 try {
461 begin();
462 synchronized (blockingLock()) {
463 synchronized (stateLock) {
464 if (!isOpen()) {
465 return false;
466 }
467 receiverThread = NativeThread.current();
468 }
469 if (!isBlocking()) {
470 for (;;) {
471 n = checkConnect(fd, false, readyToConnect);
472 if ( (n == IOStatus.INTERRUPTED)
473 && isOpen())
474 continue;
475 break;
476 }
477 } else {
478 for (;;) {
479 n = checkConnect(fd, true, readyToConnect);
480 if (n == 0) {
481
482
483 continue;
484 }
485 if ( (n == IOStatus.INTERRUPTED)
486 && isOpen())
487 continue;
488 break;
489 }
490 }
491 }
492 } finally {
493 synchronized (stateLock) {
494 receiverThread = 0;
495 if (state == ChannelState.KILLPENDING) {
496 kill();
497
498
499
500
501
502 n = 0;
503 }
504 }
505 end((n > 0) || (n == IOStatus.UNAVAILABLE));
506 assert IOStatus.check(n);
507 }
508 } catch (IOException x) {
509
510
511
512 close();
513 throw x;
514 }
515
516 if (n > 0) {
517 synchronized (stateLock) {
518 state = ChannelState.CONNECTED;
519 if (!isBound()) {
520 InetSocketAddress boundIsa =
521 Net.localAddress(fd);
522 port = boundIsa.getPort();
523 }
524
525
526 ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
527 try {
528 receive(buf, null, null, true);
529 } finally {
530 Util.releaseTemporaryDirectBuffer(buf);
531 }
532
533
534 try {
535 remoteAddresses = getRemoteAddresses();
536 } catch (IOException unused) { }
537
538 return true;
539 }
540 }
541 }
542 }
543 return false;
544 }
545
546 @Override
547 protected void implConfigureBlocking(boolean block) throws IOException {
548 IOUtil.configureBlocking(fd, block);
549 }
550
551 @Override
552 public void implCloseSelectableChannel() throws IOException {
553 synchronized (stateLock) {
554 SctpNet.preClose(fdVal);
555
556 if (receiverThread != 0)
557 NativeThread.signal(receiverThread);
558
559 if (senderThread != 0)
560 NativeThread.signal(senderThread);
561
562 if (!isRegistered())
563 kill();
564 }
565 }
566
567 @Override
568 public FileDescriptor getFD() {
569 return fd;
570 }
571
572 @Override
573 public int getFDVal() {
574 return fdVal;
575 }
576
577
578
579
580 private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {
581 int intOps = sk.nioInterestOps();
582 int oldOps = sk.nioReadyOps();
583 int newOps = initialOps;
584
585 if ((ops & PollArrayWrapper.POLLNVAL) != 0) {
586
587
588
589 return false;
590 }
591
592 if ((ops & (PollArrayWrapper.POLLERR
593 | PollArrayWrapper.POLLHUP)) != 0) {
594 newOps = intOps;
595 sk.nioReadyOps(newOps);
596
597
598 readyToConnect = true;
599 return (newOps & ~oldOps) != 0;
600 }
601
602 if (((ops & PollArrayWrapper.POLLIN) != 0) &&
603 ((intOps & SelectionKey.OP_READ) != 0) &&
604 isConnected())
605 newOps |= SelectionKey.OP_READ;
606
607 if (((ops & PollArrayWrapper.POLLCONN) != 0) &&
608 ((intOps & SelectionKey.OP_CONNECT) != 0) &&
609 ((state == ChannelState.UNCONNECTED) || (state == ChannelState.PENDING))) {
610 newOps |= SelectionKey.OP_CONNECT;
611 readyToConnect = true;
612 }
613
614 if (((ops & PollArrayWrapper.POLLOUT) != 0) &&
615 ((intOps & SelectionKey.OP_WRITE) != 0) &&
616 isConnected())
617 newOps |= SelectionKey.OP_WRITE;
618
619 sk.nioReadyOps(newOps);
620 return (newOps & ~oldOps) != 0;
621 }
622
623 @Override
624 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
625 return translateReadyOps(ops, sk.nioReadyOps(), sk);
626 }
627
628 @Override
629 @SuppressWarnings("all")
630 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
631 return translateReadyOps(ops, 0, sk);
632 }
633
634 @Override
635 public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
636 int newOps = 0;
637 if ((ops & SelectionKey.OP_READ) != 0)
638 newOps |= PollArrayWrapper.POLLIN;
639 if ((ops & SelectionKey.OP_WRITE) != 0)
640 newOps |= PollArrayWrapper.POLLOUT;
641 if ((ops & SelectionKey.OP_CONNECT) != 0)
642 newOps |= PollArrayWrapper.POLLCONN;
643 sk.selector.putEventOps(sk, newOps);
644 }
645
646 @Override
647 public void kill() throws IOException {
648 synchronized (stateLock) {
649 if (state == ChannelState.KILLED)
650 return;
651 if (state == ChannelState.UNINITIALIZED) {
652 state = ChannelState.KILLED;
653 return;
654 }
655 assert !isOpen() && !isRegistered();
656
657
658
659 if (receiverThread == 0 && senderThread == 0) {
660 SctpNet.close(fdVal);
661 state = ChannelState.KILLED;
662 } else {
663 state = ChannelState.KILLPENDING;
664 }
665 }
666 }
667
668 @Override
669 public <T> SctpChannel setOption(SctpSocketOption<T> name, T value)
670 throws IOException {
671 if (name == null)
672 throw new NullPointerException();
673 if (!supportedOptions().contains(name))
674 throw new UnsupportedOperationException("'" + name + "' not supported");
675
676 synchronized (stateLock) {
677 if (!isOpen())
678 throw new ClosedChannelException();
679
680 SctpNet.setSocketOption(fdVal, name, value, 0 );
681 }
682 return this;
683 }
684
685 @Override
686 @SuppressWarnings("unchecked")
687 public <T> T getOption(SctpSocketOption<T> name) throws IOException {
688 if (name == null)
689 throw new NullPointerException();
690 if (!supportedOptions().contains(name))
691 throw new UnsupportedOperationException("'" + name + "' not supported");
692
693 synchronized (stateLock) {
694 if (!isOpen())
695 throw new ClosedChannelException();
696
697 return (T)SctpNet.getSocketOption(fdVal, name, 0 );
698 }
699 }
700
701 private static class DefaultOptionsHolder {
702 static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions();
703
704 private static Set<SctpSocketOption<?>> defaultOptions() {
705 HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10);
706 set.add(SCTP_DISABLE_FRAGMENTS);
707 set.add(SCTP_EXPLICIT_COMPLETE);
708 set.add(SCTP_FRAGMENT_INTERLEAVE);
709 set.add(SCTP_INIT_MAXSTREAMS);
710 set.add(SCTP_NODELAY);
711 set.add(SCTP_PRIMARY_ADDR);
712 set.add(SCTP_SET_PEER_PRIMARY_ADDR);
713 set.add(SO_SNDBUF);
714 set.add(SO_RCVBUF);
715 set.add(SO_LINGER);
716 return Collections.unmodifiableSet(set);
717 }
718 }
719
720 @Override
721 public final Set<SctpSocketOption<?>> supportedOptions() {
722 return DefaultOptionsHolder.defaultOptions;
723 }
724
725 @Override
726 public <T> MessageInfo receive(ByteBuffer buffer,
727 T attachment,
728 NotificationHandler<T> handler)
729 throws IOException {
730 return receive(buffer, attachment, handler, false);
731 }
732
733 private <T> MessageInfo receive(ByteBuffer buffer,
734 T attachment,
735 NotificationHandler<T> handler,
736 boolean fromConnect)
737 throws IOException {
738 if (buffer == null)
739 throw new IllegalArgumentException("buffer cannot be null");
740
741 if (buffer.isReadOnly())
742 throw new IllegalArgumentException("Read-only buffer");
743
744 if (receiveInvoked.get())
745 throw new IllegalReceiveException(
746 "cannot invoke receive from handler");
747 receiveInvoked.set(Boolean.TRUE);
748
749 try {
750 SctpResultContainer resultContainer = new SctpResultContainer();
751 do {
752 resultContainer.clear();
753 synchronized (receiveLock) {
754 if (!ensureReceiveOpen())
755 return null;
756
757 int n = 0;
758 try {
759 begin();
760
761 synchronized (stateLock) {
762 if(!isOpen())
763 return null;
764 receiverThread = NativeThread.current();
765 }
766
767 do {
768 n = receive(fdVal, buffer, resultContainer, fromConnect);
769 } while ((n == IOStatus.INTERRUPTED) && isOpen());
770 } finally {
771 receiverCleanup();
772 end((n > 0) || (n == IOStatus.UNAVAILABLE));
773 assert IOStatus.check(n);
774 }
775
776 if (!resultContainer.isNotification()) {
777
778 if (resultContainer.hasSomething()) {
779
780 SctpMessageInfoImpl info =
781 resultContainer.getMessageInfo();
782 synchronized (stateLock) {
783 assert association != null;
784 info.setAssociation(association);
785 }
786 return info;
787 } else
788
789 return null;
790 } else {
791 synchronized (stateLock) {
792 handleNotificationInternal(
793 resultContainer);
794 }
795 }
796
797 if (fromConnect) {
798
799
800
801
802 return null;
803 }
804 }
805 } while (handler == null ? true :
806 (invokeNotificationHandler(resultContainer, handler, attachment)
807 == HandlerResult.CONTINUE));
808
809 return null;
810 } finally {
811 receiveInvoked.set(Boolean.FALSE);
812 }
813 }
814
815 private int receive(int fd,
816 ByteBuffer dst,
817 SctpResultContainer resultContainer,
818 boolean peek)
819 throws IOException {
820 int pos = dst.position();
821 int lim = dst.limit();
822 assert (pos <= lim);
823 int rem = (pos <= lim ? lim - pos : 0);
824 if (dst instanceof DirectBuffer && rem > 0)
825 return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos, peek);
826
827
828 int newSize = Math.max(rem, 1);
829 ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
830 try {
831 int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0, peek);
832 bb.flip();
833 if (n > 0 && rem > 0)
834 dst.put(bb);
835 return n;
836 } finally {
837 Util.releaseTemporaryDirectBuffer(bb);
838 }
839 }
840
841 private int receiveIntoNativeBuffer(int fd,
842 SctpResultContainer resultContainer,
843 ByteBuffer bb,
844 int rem,
845 int pos,
846 boolean peek)
847 throws IOException
848 {
849 int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem, peek);
850
851 if (n > 0)
852 bb.position(pos + n);
853 return n;
854 }
855
856 private InternalNotificationHandler<?> internalNotificationHandler =
857 new InternalNotificationHandler();
858
859 private void handleNotificationInternal(SctpResultContainer resultContainer)
860 {
861 invokeNotificationHandler(resultContainer,
862 internalNotificationHandler, null);
863 }
864
865 private class InternalNotificationHandler<T>
866 extends AbstractNotificationHandler<T>
867 {
868 @Override
869 public HandlerResult handleNotification(
870 AssociationChangeNotification not, T unused) {
871 if (not.event().equals(
872 AssociationChangeNotification.AssocChangeEvent.COMM_UP) &&
873 association == null) {
874 SctpAssocChange sac = (SctpAssocChange) not;
875 association = new SctpAssociationImpl
876 (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams());
877 }
878 return HandlerResult.CONTINUE;
879 }
880 }
881
882 private <T> HandlerResult invokeNotificationHandler
883 (SctpResultContainer resultContainer,
884 NotificationHandler<T> handler,
885 T attachment) {
886 SctpNotification notification = resultContainer.notification();
887 synchronized (stateLock) {
888 notification.setAssociation(association);
889 }
890
891 if (!(handler instanceof AbstractNotificationHandler)) {
892 return handler.handleNotification(notification, attachment);
893 }
894
895
896 AbstractNotificationHandler absHandler =
897 (AbstractNotificationHandler)handler;
898 switch(resultContainer.type()) {
899 case ASSOCIATION_CHANGED :
900 return absHandler.handleNotification(
901 resultContainer.getAssociationChanged(), attachment);
902 case PEER_ADDRESS_CHANGED :
903 return absHandler.handleNotification(
904 resultContainer.getPeerAddressChanged(), attachment);
905 case SEND_FAILED :
906 return absHandler.handleNotification(
907 resultContainer.getSendFailed(), attachment);
908 case SHUTDOWN :
909 return absHandler.handleNotification(
910 resultContainer.getShutdown(), attachment);
911 default :
912
913 return absHandler.handleNotification(
914 resultContainer.notification(), attachment);
915 }
916 }
917
918 private void checkAssociation(Association sendAssociation) {
919 synchronized (stateLock) {
920 if (sendAssociation != null && !sendAssociation.equals(association)) {
921 throw new IllegalArgumentException(
922 "Cannot send to another association");
923 }
924 }
925 }
926
927 private void checkStreamNumber(int streamNumber) {
928 synchronized (stateLock) {
929 if (association != null) {
930 if (streamNumber < 0 ||
931 streamNumber >= association.maxOutboundStreams())
932 throw new InvalidStreamException();
933 }
934 }
935 }
936
937
938
939
940
941 @Override
942 public int send(ByteBuffer buffer, MessageInfo messageInfo)
943 throws IOException {
944 if (buffer == null)
945 throw new IllegalArgumentException("buffer cannot be null");
946
947 if (messageInfo == null)
948 throw new IllegalArgumentException("messageInfo cannot be null");
949
950 checkAssociation(messageInfo.association());
951 checkStreamNumber(messageInfo.streamNumber());
952
953 synchronized (sendLock) {
954 ensureSendOpen();
955
956 int n = 0;
957 try {
958 begin();
959
960 synchronized (stateLock) {
961 if(!isOpen())
962 return 0;
963 senderThread = NativeThread.current();
964 }
965
966 do {
967 n = send(fdVal, buffer, messageInfo);
968 } while ((n == IOStatus.INTERRUPTED) && isOpen());
969
970 return IOStatus.normalize(n);
971 } finally {
972 senderCleanup();
973 end((n > 0) || (n == IOStatus.UNAVAILABLE));
974 assert IOStatus.check(n);
975 }
976 }
977 }
978
979 private int send(int fd, ByteBuffer src, MessageInfo messageInfo)
980 throws IOException {
981 int streamNumber = messageInfo.streamNumber();
982 SocketAddress target = messageInfo.address();
983 boolean unordered = messageInfo.isUnordered();
984 int ppid = messageInfo.payloadProtocolID();
985
986 if (src instanceof DirectBuffer)
987 return sendFromNativeBuffer(fd, src, target, streamNumber,
988 unordered, ppid);
989
990
991 int pos = src.position();
992 int lim = src.limit();
993 assert (pos <= lim && streamNumber >= 0);
994
995 int rem = (pos <= lim ? lim - pos : 0);
996 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
997 try {
998 bb.put(src);
999 bb.flip();
1000
1001 src.position(pos);
1002
1003 int n = sendFromNativeBuffer(fd, bb, target, streamNumber,
1004 unordered, ppid);
1005 if (n > 0) {
1006
1007 src.position(pos + n);
1008 }
1009 return n;
1010 } finally {
1011 Util.releaseTemporaryDirectBuffer(bb);
1012 }
1013 }
1014
1015 private int sendFromNativeBuffer(int fd,
1016 ByteBuffer bb,
1017 SocketAddress target,
1018 int streamNumber,
1019 boolean unordered,
1020 int ppid)
1021 throws IOException {
1022 int pos = bb.position();
1023 int lim = bb.limit();
1024 assert (pos <= lim);
1025 int rem = (pos <= lim ? lim - pos : 0);
1026
1027 int written = send0(fd, ((DirectBuffer)bb).address() + pos,
1028 rem, target, -1 , streamNumber, unordered, ppid);
1029 if (written > 0)
1030 bb.position(pos + written);
1031 return written;
1032 }
1033
1034 @Override
1035 public SctpChannel shutdown() throws IOException {
1036 synchronized(stateLock) {
1037 if (isShutdown)
1038 return this;
1039
1040 ensureSendOpen();
1041 SctpNet.shutdown(fdVal, -1);
1042 if (senderThread != 0)
1043 NativeThread.signal(senderThread);
1044 isShutdown = true;
1045 }
1046 return this;
1047 }
1048
1049 @Override
1050 public Set<SocketAddress> getAllLocalAddresses()
1051 throws IOException {
1052 synchronized (stateLock) {
1053 if (!isOpen())
1054 throw new ClosedChannelException();
1055 if (!isBound())
1056 return Collections.EMPTY_SET;
1057
1058 return SctpNet.getLocalAddresses(fdVal);
1059 }
1060 }
1061
1062 @Override
1063 public Set<SocketAddress> getRemoteAddresses()
1064 throws IOException {
1065 synchronized (stateLock) {
1066 if (!isOpen())
1067 throw new ClosedChannelException();
1068 if (!isConnected() || isShutdown)
1069 return Collections.EMPTY_SET;
1070
1071 try {
1072 return SctpNet.getRemoteAddresses(fdVal, 0);
1073 } catch (SocketException unused) {
1074
1075 return remoteAddresses;
1076 }
1077 }
1078 }
1079
1080
1081 private static native void initIDs();
1082
1083 static native int receive0(int fd, SctpResultContainer resultContainer,
1084 long address, int length, boolean peek) throws IOException;
1085
1086 static native int send0(int fd, long address, int length,
1087 SocketAddress target, int assocId, int streamNumber,
1088 boolean unordered, int ppid) throws IOException;
1089
1090 private static native int checkConnect(FileDescriptor fd, boolean block,
1091 boolean ready) throws IOException;
1092
1093 static {
1094 Util.load();
1095 java.security.AccessController.doPrivileged(
1096 new sun.security.action.LoadLibraryAction("sctp"));
1097 initIDs();
1098 }
1099 }